package com.atguigu.flink.sql.other;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

/**
 * Created by Smexy on 2023/7/25
 *
 */
public class Demo8_APIConvert
{
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
                env.setParallelism(1);

        //使用DataStreamAPI 获取了一个流
        SingleOutputStreamOperator<WaterSensor> ds = env
                    .socketTextStream("hadoop102", 8888)
                    .map(new WaterSensorMapFunction())
                    ;

        SingleOutputStreamOperator<WaterSensor> ds1 = ds.map(w -> {
            w.setVc(w.getVc() + 10);
            return w;
        });

        //把流转换为Table
         Table table = tableEnv.fromDataStream(ds1);

        Table t1 = table
            .where($("id").isEqual("s1"))
            .select($("id"), $("vc"));

        //使用sql
        tableEnv.createTemporaryView("t1",t1);

        Table t2 = tableEnv.sqlQuery(" select id,sum(vc) sumVc from t1 group by id ");

        //玩流
        DataStream<Row> ds3 = tableEnv.toChangelogStream(t2);

        ds3
            .map(r -> {
                String id = r.<String>getFieldAs("id");
                Integer sumVc = r.<Integer>getFieldAs("sumVc");
                return id +":" + sumVc;
            })
                .print();


        env.execute();


    }
}
